{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Spark Tutorials - Learning Apache Sparks\n",
    "\n",
    "**`Apache Spark`**, is a framework for large-scale data processing. Many traditional frameworks were designed to be run on a single computer. However, many datasets today are too large to be stored on a single computer, and even when a dataset can be stored on one computer, it can often be processed much more quickly using multiple computers."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## `Transformation`\n",
    "`map(), mapPartitions(), mapPartitionsWithIndex(), filter(), flatMap(), reduceByKey(), groupByKey()`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 78,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "4"
      ]
     },
     "execution_count": 78,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# Create RDD and subtract 1 from each number then find max\n",
    "dataRDD = sc.parallelize(xrange(1,21))\n",
    "\n",
    "# Let's see how many partitions the RDD will be split into using the getNumPartitions()\n",
    "dataRDD.getNumPartitions()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 60,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "'(4) PythonRDD[49] at RDD at PythonRDD.scala:43 []\\n |  ParallelCollectionRDD[47] at parallelize at PythonRDD.scala:423 []'"
      ]
     },
     "execution_count": 60,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "dataRDD.map(lambda x: x - 1).max()\n",
    "dataRDD.toDebugString()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 79,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "4\n",
      "110\n",
      "110\n"
     ]
    }
   ],
   "source": [
    "# Find the even numbers\n",
    "print(dataRDD.getNumPartitions())\n",
    "\n",
    "# Find even numbers\n",
    "evenRDD = dataRDD.filter(lambda x: x % 2 == 0)\n",
    "\n",
    "# Reduce by adding up all values in the RDD\n",
    "print(evenRDD.reduce(lambda x, y: x + y))\n",
    "\n",
    "\n",
    "# Use Python add function to sum\n",
    "from operator import add\n",
    "print(evenRDD.reduce(add))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "collapsed": true
   },
   "source": [
    "# `Action`\n",
    "`first(), take(), takeSample(), takeOrdered(), collect(), count(), countByValue(), reduce(), top()`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 80,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]"
      ]
     },
     "execution_count": 80,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# Take first n values\n",
    "evenRDD.take(10)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 81,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "defaultdict(int,\n",
       "            {2: 1, 4: 1, 6: 1, 8: 1, 10: 1, 12: 1, 14: 1, 16: 1, 18: 1, 20: 1})"
      ]
     },
     "execution_count": 81,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# Count distinct values in RDD and return dictionary of values and counts\n",
    "evenRDD.countByValue()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## `reduceByKey()`, `combineByKey()` and `foldByKey()` are better than `groupByKey()`!"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 91,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[('a', [1, 2]), ('b', [1])]\n",
      "[('a', 3), ('b', 1)]\n",
      "[('a', 3), ('b', 1)]\n",
      "[('a', 3), ('b', 1)]\n"
     ]
    }
   ],
   "source": [
    "pairRDD = sc.parallelize([('a', 1), ('a', 2), ('b', 1)])\n",
    "\n",
    "# mapValues only used to improve format for printing\n",
    "print pairRDD.groupByKey().mapValues(lambda x: list(x)).collect()\n",
    "\n",
    "# Different ways to sum by key\n",
    "print pairRDD.groupByKey().map(lambda (k, v): (k, sum(v))).collect()\n",
    "\n",
    "# Using mapValues, which is recommended when the key doesn't change\n",
    "print pairRDD.groupByKey().mapValues(lambda x: sum(x)).collect()\n",
    "\n",
    "# reduceByKey is more efficient / scalable\n",
    "print pairRDD.reduceByKey(add).collect()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## `mapPartitions()`\n",
    "The mapPartitions() transformation uses a function that takes in an iterator (to the items in that specific partition) and returns an iterator. The function is applied on a partition by partition basis."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 93,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "['cat', 'elephant', 'rat', 'rat', 'cat']\n",
      "['cat', 'elephant', 'rat', 'rat,cat']\n"
     ]
    }
   ],
   "source": [
    "# mapPartitions takes a function that takes an iterator and returns an iterator\n",
    "print wordsRDD.collect()\n",
    "\n",
    "itemsRDD = wordsRDD.mapPartitions(lambda iterator: [','.join(iterator)])\n",
    "\n",
    "print itemsRDD.collect()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## `mapPartitionsWithIndex()`\n",
    "The mapPartitionsWithIndex() transformation uses a function that takes in a partition index (think of this like the partition number) and an iterator (to the items in that specific partition). For every partition (index, iterator) pair, the function returns a tuple of the same partition index number and an iterator of the transformed items in that partition."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 94,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[(0, ['cat']), (1, ['elephant']), (2, ['rat']), (3, ['rat', 'cat'])]\n",
      "[0, ['cat'], 1, ['elephant'], 2, ['rat'], 3, ['rat', 'cat']]\n"
     ]
    }
   ],
   "source": [
    "itemsByPartRDD = wordsRDD.mapPartitionsWithIndex(lambda index, iterator: [(index, list(iterator))])\n",
    "\n",
    "# We can see that three of the (partitions) workers have one element and the fourth worker has two\n",
    "# elements, although things may not bode well for the rat...\n",
    "print itemsByPartRDD.collect()\n",
    "\n",
    "# Rerun without returning a list (acts more like flatMap)\n",
    "itemsByPartRDD = wordsRDD.mapPartitionsWithIndex(lambda index, iterator: (index, list(iterator)))\n",
    "\n",
    "print itemsByPartRDD.collect()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# `Others`\n",
    "`cache(), unpersist(), id(), setName()`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 104,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "def brokenTen(value):\n",
    "    \"\"\"Incorrect implementation of the ten function.\n",
    "\n",
    "    Note:\n",
    "        The `if` statement checks an undefined variable `val` instead of `value`.\n",
    "\n",
    "    Args:\n",
    "        value (int): A number.\n",
    "\n",
    "    Returns:\n",
    "        bool: Whether `value` is less than ten.\n",
    "\n",
    "    Raises:\n",
    "        NameError: The function references `val`, which is not available in the local or global\n",
    "            namespace, so a `NameError` is raised.\n",
    "    \"\"\"\n",
    "#     if (val < 10):\n",
    "    if (value < 10):\n",
    "        return True\n",
    "    else:\n",
    "        return False\n",
    "\n",
    "brokenRDD = dataRDD.filter(brokenTen)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 117,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[1, 2, 3, 4, 5, 6, 7, 8, 9]"
      ]
     },
     "execution_count": 117,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "brokenRDD.collect()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Word Count & Related Processing Lab"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Word Count"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 110,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "<class 'pyspark.rdd.RDD'>\n"
     ]
    }
   ],
   "source": [
    "wordslist = ['cat', 'elephant', 'rat', 'rat', 'cat']\n",
    "wordsRDD = sc.parallelize(wordslist)\n",
    "print(type(wordsRDD))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Pluralize words"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 114,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "['cats', 'elephants', 'rats', 'rats', 'cats']"
      ]
     },
     "execution_count": 114,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# Pluralize each word\n",
    "pluralwords = wordsRDD.map(lambda w: w +'s').collect()\n",
    "pluralwords"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Length of words"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 116,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[4, 9, 4, 4, 4]"
      ]
     },
     "execution_count": 116,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# Find length of each word\n",
    "pluralRDD = sc.parallelize(pluralwords)\n",
    "pluralRDD.map(len).collect()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Word Count, Key-Value Pair"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 121,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[('cats', 2), ('rats', 2), ('elephants', 1)]"
      ]
     },
     "execution_count": 121,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# CountByValue()\n",
    "pluralRDD.countByValue().items()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 128,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[('cats', 2), ('rats', 2), ('elephants', 1)]"
      ]
     },
     "execution_count": 128,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# Lambda with CountByKey()\n",
    "pluralRDD.map(lambda d: (d,1)).countByKey().items()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 148,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "[('cats', 1), ('elephants', 1), ('rats', 1), ('rats', 1), ('cats', 1)]\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "[('rats', 2), ('cats', 2), ('elephants', 1)]"
      ]
     },
     "execution_count": 148,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "# Count with Lambda with reduceByKey()\n",
    "newPluralRDD = pluralRDD.map(lambda d: (d,1))\n",
    "print(newPluralRDD.collect())\n",
    "\n",
    "newPluralRDD.reduceByKey(add).collect()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Unique Words in RDD"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 157,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "['cats', 'elephants', 'rats', 'rats', 'cats']\n"
     ]
    },
    {
     "data": {
      "text/plain": [
       "['rats', 'cats', 'elephants']"
      ]
     },
     "execution_count": 157,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "print(pluralRDD.collect())\n",
    "pluralRDD.distinct().collect()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Find `mean()`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 177,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "1.67"
      ]
     },
     "execution_count": 177,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "total = pluralRDD.count()\n",
    "size = float(pluralRDD.distinct().count())\n",
    "round(total/size, 2)\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Apply WordCount to a File "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 372,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[('rat', 2), ('elephant', 1), ('cat', 2)]"
      ]
     },
     "execution_count": 372,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "\n",
    "def wordCount(wordsListRDD):\n",
    "    \"\"\" Inputs word List RDD and outputs Key value pair count of the words.\"\"\"\n",
    "    return wordsListRDD.countByValue().items()\n",
    "\n",
    "wordCount(wordsRDD)\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 373,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "\n",
    "import re\n",
    "f = open(\"alice.txt\", 'r')\n",
    "\n",
    "pattern = re.compile(r\"[.,\\[\\]\\\"'*!?`_\\s();-]+\")\n",
    "wordsList = [re.sub(pattern, '', sents).lower() for sents in f.read().split()]\n",
    "wordsList = filter(None, wordsList)\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 381,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[('the', 1632),\n",
       " ('and', 845),\n",
       " ('to', 721),\n",
       " ('a', 627),\n",
       " ('she', 537),\n",
       " ('it', 517),\n",
       " ('of', 508),\n",
       " ('said', 459),\n",
       " ('i', 401),\n",
       " ('alice', 379),\n",
       " ('in', 366),\n",
       " ('you', 361),\n",
       " ('was', 355),\n",
       " ('that', 273),\n",
       " ('as', 262),\n",
       " ('her', 243),\n",
       " ('at', 210),\n",
       " ('on', 191),\n",
       " ('with', 180),\n",
       " ('had', 178),\n",
       " ('all', 178),\n",
       " ('but', 166),\n",
       " ('for', 153),\n",
       " ('so', 150),\n",
       " ('be', 146),\n",
       " ('not', 144),\n",
       " ('very', 144),\n",
       " ('what', 136),\n",
       " ('this', 130),\n",
       " ('little', 128)]"
      ]
     },
     "execution_count": 381,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "\n",
    "wordsFileRDD = sc.parallelize(wordsList)\n",
    "p_wordfile = wordCount(wordsFileRDD)\n",
    "\n",
    "# Print top 30 words\n",
    "sorted(p_wordfile, key=lambda tup:tup[1], reverse=True)[:30]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 2",
   "language": "python",
   "name": "python2"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 2
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython2",
   "version": "2.7.11"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 0
}
